package initialize

import (
	"context"
	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"go.uber.org/zap"
	"os"
)

func InitConsumer(nameServer, groupName, topicName string, f func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error)) {
	// 监听rocketMQ中订单超时的topic
	c, err := rocketmq.NewPushConsumer(
		//consumer.WithNameServer([]string{"127.0.0.1:9876"}),
		consumer.WithNameServer([]string{nameServer}),
		// 多实例场景避免重复消费
		//consumer.WithGroupName("ygshop-order"),
		consumer.WithGroupName(groupName),
	)
	if err != nil {
		zap.S().Panic("实例化consumer失败:", err)
	}

	// 订阅rocketMQ消息，用于归还库存
	// 终端执行export ROCKETMQ_GO_LOG_LEVEL=error，可以取消rmq的info日志输出
	zap.S().Debug("ROCKETMQ_GO_LOG_LEVEL:", os.Getenv("ROCKETMQ_GO_LOG_LEVEL"))

	if err = c.Subscribe(
		//"order_timeout",
		topicName,
		consumer.MessageSelector{},
		f,
	); err != nil {
		zap.S().Panic("读取消息失败:", err)
	}

	if err = c.Start(); err != nil {
		zap.S().Panic("启动consumer失败:", err)
	} else {
		zap.S().Debug("启动consumer成功")
	}
}
